#include "config.h"

#include <sys/types.h>
#include <regex.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <sys/vfs.h>
#include <unistd.h>
#include <getopt.h>
#include <sys/statvfs.h>

#define DBG_SUBSYS S_LIBCLUSTER

#include "sysy_lib.h"
#include "env.h"
#include "configure.h"
#include "fence.h"
#include "lichconf.h"
#include "job_dock.h"
#include "minirpc.h"
#include "rpc_proto.h"
#include "net_global.h"
#include "etcd.h"
#include "../nodepool/nodeid.h"
#include "metadata.h"
#include "cluster.h"
#include "../dispatch/dispatch.h"
#include "net_table.h"
#include "conn.h"
#include "ynet_rpc.h"
#include "node.h"
#include "ylog.h"
#include "disk.h"
#include "bh_task.h"
#include "rmsnap_bh.h"
#include "stor_root.h"
#include "system.h"
#include "../../storage/storage/local_vol.h"
#include "../../storage/controller/castoff.h"
#include "lease_ctl.h"
#include "recovery.h"
#include "dbg.h"
#include "storage_status.h"

#define QUORUM_SYNC_INTERVAL 10
#define ADMIN_BH_WAIT (60 * 10)
#define NODE_HB_INTERVAL 10

extern node_t __node__;

int node_srv_getinfo(nodeinfo_t *_info, char *buf)
{
        int ret;
        char status[MAX_NAME_LEN], dfinfo[MAX_BUF_LEN];
        uint32_t buflen, dflen;
        nodestat_t stat;
        nodedfree_t *dfree;
        nid_t _admin, local;

        ret = ymalloc((void **)&dfree, sizeof(*dfree));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = node_stat(&stat, dfree);
        if (unlikely(ret))
                GOTO(err_free, ret);

        _admin = *net_getadmin();
        local = *net_getnid();
        if (!net_isnull(&_admin) && nid_cmp(&_admin, &local) == 0) {
                strcpy(status, NODE_STATUS_ADMIN);
        } else {
                if (etcd_is_proxy()) {
                        strcpy(status, NODE_STATUS_NORMAL);
                } else {
                        strcpy(status, NODE_STATUS_META);
                }
        }

        dflen = MAX_BUF_LEN;
        dfinfo_encode(dfinfo, &dflen, dfree);

        buflen = MAX_BUF_LEN;
        _opaque_encode(buf, &buflen,
                       __node__.name, strlen(__node__.name) + 1,
                       __node__.clustername, strlen(__node__.clustername) + 1,
                       status, strlen(status) + 1,
                       ng.home, strlen(ng.home) + 1,
                       &stat, sizeof(nodestat_t) + 1,
                       dfinfo, dflen,
                       &ng.admin_uptime, sizeof(uint32_t),
                       __node__.admin, strlen(__node__.admin) + 1,
                       NULL);

        nodeinfo_decode(buf, buflen, _info);

        YASSERT(buflen < MAX_BUF_LEN);
        YASSERT(_info->clustername);

        DBUG("node info:\n"
             "cluster:%s\n"
             "hostname:%s\n"
             "status:%s\n"
             "uptime:%llu\n",
             _info->clustername,
             _info->nodename,
             _info->status,
             (LLU)_info->uptime);

        yfree((void **)&dfree);

        return 0;
err_free:
        yfree((void **)&dfree);
err_ret:
        return ret;
}

int node_srv_init(const char *home)
{
        int ret;
        char path[MAX_PATH_LEN], port[MAX_NAME_LEN], tmp[MAX_NAME_LEN];
        struct stat stbuf;
        nid_t nid;

        DINFO("node srv init\n");

        ret = stat(home, &stbuf);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        strcpy(__node__.home, home);

        snprintf(port, sizeof(port), "%d", gloconf.control_port);

        ret = minirpc_init(NULL, port);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = net_gethostname(__node__.name, MAX_NAME_LEN);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        strcpy(ng.name, __node__.name);

        snprintf(path, MAX_NAME_LEN, "%s/config/clustername", __node__.home);

        ret = _get_value(path, __node__.clustername, MAX_BUF_LEN);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        sprintf(path, "%s/config/nid", __node__.home);

        ret = _get_text(path, tmp, MAX_NAME_LEN);
        if (ret < 0) {
                ret = -ret;
                if (ret == ENOENT) {
                        memset(&nid, 0x0, sizeof(nid));
                } else
                        GOTO(err_ret, ret);
        } else {
                str2nid(&nid, tmp);
                net_setnid(&nid);
        }

        ret = rpc_passive(gloconf.data_port);
        if (unlikely(ret)) {
                sleep(1);
                EXIT(EAGAIN);
                GOTO(err_ret, ret);
        }

        DINFO("rpc_passive port: %d \n", ng.port);
        DINFO("set local nid "NID_FORMAT"\n", NID_ARG(&nid));

        ret = node_start();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int node_srv_heartbeat(int force)
{
        int ret;
        nodestat_t stat;
        nodedfree_t *dfree;
        char dfinfo[MAX_BUF_LEN];
        static uint32_t prev = 0, prev_ltime = 0;
        uint32_t admin_uptime, now = gettime();
        uint32_t dflen;
        time_t ltime;

        if (net_isnull(net_getadmin())) {
                return 0;
        }

        if (strlen(__node__.status) == 0) {
                DERROR("__node__.status not inited\n");
                return 0;
        }

        ret = network_connect(net_getadmin(), &ltime, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        if (force == 0 && prev_ltime == ltime && (now - prev < NODE_HB_INTERVAL)) {
                return 0;
        }

        ret = ymalloc((void **)&dfree, sizeof(*dfree));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = node_stat(&stat, dfree);
        if (unlikely(ret))
                GOTO(err_free, ret);

        dflen = MAX_BUF_LEN;
        dfinfo_encode(dfinfo, &dflen, dfree);

        yfree((void **)&dfree);

        if (net_isnull(&stat.nid)) {
                ret = EAGAIN;
                GOTO(err_ret, ret);
        }

        ret = dispatch_heartbeat(__node__.name, dfinfo, &stat, __node__.status, &admin_uptime);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DBUG("heartbeat, prev %u, writeable %u\n", prev,
             stat.status & __NODE_STAT_WRITEABLE__);

        ng.admin_uptime = admin_uptime;
        prev = now;
        prev_ltime = ltime;

        return 0;
err_free:
        yfree((void **)&dfree);
err_ret:
        return ret;
}

static void __node_srv_heartbeat_wait()
{
        int ret, retry = 0;

        while (retry < gloconf.master_timeout / 2) {
                ret = node_srv_heartbeat(1);
                if (unlikely(ret)) {
                        DINFO("node heartbeat fail %d\n", ret);
                        sleep(1);
                        retry++;
                        continue;
                }

                break;
        }
}

typedef struct {
        int running;
        sem_t sem;
        sem_t stoped;
} arg_t;

static void *__node_normal_run(void *_arg)
{
        int ret, retry = 0;
        arg_t *arg = _arg;

        DINFO("normal heartbeat service start\n");

        while (arg->running) {
                ret = network_connect_master();
                if (unlikely(ret)) {
                        DWARN("node heartbeat fail %d, retry %u\n", ret, retry);
                        sleep(1);
                        retry++;
                        continue;
                }
                
                ret = node_srv_heartbeat(1);
                if (unlikely(ret)) {
                        DWARN("node heartbeat fail %d, retry %u\n", ret, retry);
                        sleep(1);
                        retry++;
                        continue;
                }

                break;
        }
        
        while (arg->running) {
                network_connect_master();
                node_srv_heartbeat(0);

                ret = _sem_timedwait1(&arg->sem, NODE_HB_INTERVAL / 4);
                if (ret) {
                        if (ret == ETIMEDOUT)
                                continue;
                        else
                                UNIMPLEMENTED(__DUMP__);
                }
        }

        DINFO("normal node stop\n");

        sem_post(&arg->stoped);
        pthread_exit(NULL);
}

static int __node_normal_start(arg_t *arg)
{
        int ret;

        ret = sem_init(&arg->sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sem_init(&arg->stoped, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        arg->running = 1;

        ret = sy_thread_create2(__node_normal_run, arg, "__node_normal_run");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_normal_stop(arg_t *arg)
{
        int ret;

        arg->running = 0;
        sem_post(&arg->sem);

        ret = sem_wait(&arg->stoped);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_normal_create(const char *master)
{
        int ret;
        char tmp[MAX_BUF_LEN], path[MAX_PATH_LEN];
        nid_t nid;

        snprintf(path, MAX_NAME_LEN, "%s/config/nid", __node__.home);
        ret = _get_text(path, tmp, MAX_NAME_LEN);
        if (ret < 0) {
                ret = -ret;
                if (ret == ENOENT) {
                        DINFO("create nid\n");
                } else
                        GOTO(err_ret, ret);
        } else {
                goto out;
        }

        ret = node_rpc_admin_join(master, __node__.name);
        if (ret) {
                if (ret == EEXIST) {
                        DWARN("%s exist\n", __node__.name);
                } else
                        GOTO(err_ret, ret);
        }

        ret = etcd_get_text(ETCD_NODE, __node__.name, tmp, NULL);
        if (ret)
                GOTO(err_ret, ret);

        str2nid(&nid, tmp);
        net_setnid(&nid);
        ret = _set_text(path, tmp, strlen(tmp) + 1, O_CREAT);
        if (unlikely(ret)) {
                UNIMPLEMENTED(__DUMP__);
                GOTO(err_ret, ret);
        }

        DINFO("set local nid "NID_FORMAT"\n", NID_ARG(net_getnid()));

out:
        return 0;
err_ret:
        return ret;
}

int node_srv_none_pooldrop(const char *pool)
{
        int ret, find;

        DINFO("drop pool %s\n", pool);

        ret = system_pool_find(pool, &find);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        // 如果etcd相关记录没有清除，返回错误
        if (find) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = stor_root_del(pool);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __node_normal_connect(const char *master, nid_t *_nid)
{
        int ret;
        char key[MAX_NAME_LEN], buf[MAX_BUF_LEN];
        ynet_net_info_t *info;
        nid_t nid;

        ret = etcd_get_text(ETCD_NODE, master, key, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        str2nid(&nid, key);
        info = (void *)buf;
        ret = conn_getinfo(&nid, info);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(nid_cmp(&nid, &info->id) == 0);
        YASSERT(strcmp(master, info->name) == 0);
        DINFO("set admin %u %s\n", nid.id, master);
        net_setadmin(&info->id);

#if 0
        net_handle_t nh;
        ret = netable_connect_info(&nh, info, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        *_nid = nid;
        
        return 0;
err_ret:
        return ret;
}

static int __node_ismaster(const char *master, const nid_t *nid)
{
        int ret, ismaster;

        if (!netable_connected(nid)) {
                return 0;
        }

        ret = node_rpc_admin_check(master, &ismaster);
        if (unlikely(ret))
                return 0;

        return ismaster;
}

int node_normal_start(etcd_lock_t *lock, const char *master, const int *_idx)
{
        int ret, idx;
        arg_t arg;
        char newmaster[MAX_NAME_LEN];
        nid_t nid;
        uint32_t magic;

        DINFO("start normal, admin %s\n", master);
        SINFO(0, "%s, start normal, admin %s\n", M_FUSIONSTOR_START, master);

        strcpy(__node__.status, NODE_STATUS_NORMAL);
        strcpy(__node__.admin, master);

        ret = __node_normal_create(master);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __node_normal_connect(master, &nid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if !ENABLE_START_PARALLEL
        ret = conn_register();
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        ret = __node_normal_start(&arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        idx = *_idx;
        idx++;
        while (1) {
                ret = etcd_lock_watch(lock, newmaster, &magic, &idx);
                if (unlikely(ret)) {
                        if (__node_ismaster(master, &nid)) {
                                DINFO("%s still keep master\n", master);
                                sleep(1);
                                continue;
                        } else {
                                GOTO(err_stop, ret);
                        }
                }


                YASSERT(nid_cmp(net_getadmin(), &nid) == 0);
                if (ng.master_magic != magic) {
                        DINFO("set master magic 0x%x --> 0x%x\n", ng.master_magic, magic);
                        ng.master_magic = magic;
                }

                idx++;
                DBUG("watch return old %s new %s idx %u\n", master, newmaster, idx);

                if (strcmp(newmaster, master)) {
                        DINFO("master swaped old %s new %s\n", master, newmaster);
                        break;
                }
        }

        ret = __node_normal_stop(&arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_stop:
        __node_normal_stop(&arg);
err_ret:
        return ret;
}

int node_srv_none_castoff(int idx)
{
        return castoff_disk(idx);
}
